14 实践课-MySQL 数据库连接与基础查询工具开发

MySQL 数据库连接与基础查询工具开发

关联:索引

AI 工具使用:数据库连接 / SQL 优化 / 报错排查提示词模板(学生可直接复制)

使用方法:把你的表结构、SQL、报错信息、环境(Windows/Conda/依赖版本)粘贴到 {你的内容};要求 AI 输出“步骤清单 + 可复制代码 + 校验点”,便于照做与复盘。

模板目录:

模板 1:生成最小可运行连接代码(带连接池 + 参数化查询)

你是 Python 实践课助教。请给我一份“最小可运行”的 MySQL 连接与查询示例,要求:
1)使用 mysql-connector-python;使用连接池(pool_name/pool_size);
2)配置从环境变量读取:MYSQL_HOST/MYSQL_PORT/MYSQL_USER/MYSQL_PASSWORD/MYSQL_DATABASE;
3)提供一个函数 select(sql, params) 只允许 SELECT,必须使用参数化查询,占位符用 %s;
4)返回结果为 list[dict](带字段名);
5)加入最基本安全校验:禁止多语句与危险关键字,给出可执行的报错信息与修复建议;
6)给出一段自测:跑一个条件查询和一个聚合查询。

我的环境与表结构:{你的内容}

模板 2:生成分拣场景 SQL(条件 + 聚合 + 优化)

下面是我分拣场景的 MySQL 表结构(含字段与索引)。请你:
1)生成 3 条 SQL:①条件查询(带排序与分页);②聚合查询(按产线/日期/品级汇总);③联表查询(分拣记录 + 批次 + 品质标准,可选再关联设备状态);
2)所有查询都必须可参数化(给出 SQL + params 示例);
3)指出每条 SQL 的潜在慢点,并给出索引建议或重写建议;
4)不要使用 SELECT *,只选必要字段。

表结构:{你的内容}

模板 3:SQL 注入风险评审(代码/SQL 安全审计)

请对我下面的数据库查询代码与 SQL 做一次安全审计,输出:
1)注入风险点(逐条指出:字符串拼接/动态表名/多语句/注释绕过等);
2)修复方案(用参数化、白名单、最小权限、只读账号等);
3)给出修复后的代码片段(可直接替换);
4)提供 3 个“恶意输入”用例用于验证是否已防注入。

我的代码与 SQL:{你的内容}

模板 4:连接报错排查(定位→修复→预防)

下面是我连接 MySQL 的报错信息、代码片段与环境信息。请按“定位→修复→预防”输出:
1)先判断属于哪类:账号权限/密码、网络与端口、防火墙、数据库名不存在、字符集、连接池耗尽、超时;
2)给出最短排查步骤(最多 8 步),每一步要写清我该检查什么、怎么验证;
3)给出修复后的代码改动建议(要点即可);
4)给出预防清单(3 条以内)。

我的内容:{你的内容}

  1. 场景提问:分拣线上出现“某批次苹果瑕疵率升高”,你需要快速回答两件事:①到底升高了吗?②是哪个设备/班次更明显?

1. 为什么先讲权限?

如果你使用的是学校统一 MySQL 环境,可能不允许你创建用户/建库:那就跳过“创建用户/建库”,直接使用老师发的只读账号与数据库名。

1. 登录 MySQL(命令行方式)

mysql -h 127.0.0.1 -P 3306 -u root -p

关键点解释:

2. 创建数据库与只读账号(演示版)

项目路径:08_greensort_system/scripts/db/00_create_db_and_user.sql

CREATE DATABASE IF NOT EXISTS greensort_db DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

# 可选操作
CREATE USER IF NOT EXISTS 'greensort_ro'@'%' IDENTIFIED BY 'Replace_With_Strong_Password';

GRANT SELECT ON greensort_db.* TO 'greensort_ro'@'%';

FLUSH PRIVILEGES;

关键点解释:

排错提示(遇到 collation 报错时用):

SELECT VERSION();
SHOW COLLATION LIKE 'utf8mb4%';

关键点解释:

安全提示(必须强调):

3. 建表:分拣场景业务数据结构(对齐项目数据库设计文档)

项目路径:08_greensort_system/scripts/db/01_schema.sql

USE greensort_db;

CREATE TABLE IF NOT EXISTS sorting_batch (
  id           BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
  batch_no     VARCHAR(32)  NOT NULL UNIQUE,
  fruit_type   VARCHAR(32)  NOT NULL DEFAULT 'xibeilei_apple',
  line_id      VARCHAR(16)  NOT NULL DEFAULT 'LINE-01',
  operator_id  BIGINT UNSIGNED DEFAULT NULL,
  start_time   DATETIME     NOT NULL,
  end_time     DATETIME     NULL,
  total_count  INT UNSIGNED NOT NULL DEFAULT 0,
  grade_a_count INT UNSIGNED NOT NULL DEFAULT 0,
  grade_b_count INT UNSIGNED NOT NULL DEFAULT 0,
  grade_c_count INT UNSIGNED NOT NULL DEFAULT 0,
  grade_d_count INT UNSIGNED NOT NULL DEFAULT 0,
  grade_e_count INT UNSIGNED NOT NULL DEFAULT 0,
  status       TINYINT      NOT NULL DEFAULT 1,
  remark       VARCHAR(255) NULL,
  created_at   DATETIME     NOT NULL DEFAULT CURRENT_TIMESTAMP,
  updated_at   DATETIME     NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  INDEX idx_batch_no (batch_no),
  INDEX idx_line_date (line_id, start_time),
  INDEX idx_fruit_type (fruit_type),
  INDEX idx_status (status)
);

CREATE TABLE IF NOT EXISTS sorting_record (
  id                  BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
  batch_id            BIGINT UNSIGNED NOT NULL,
  fruit_id            VARCHAR(64) NOT NULL UNIQUE,
  line_id             VARCHAR(32) NOT NULL,
  diameter_mm         FLOAT NOT NULL,
  color_coverage_pct  FLOAT NOT NULL,
  defect_count        INT NOT NULL DEFAULT 0,
  defect_types        JSON NULL,
  shape_index         FLOAT NOT NULL,
  surface_smoothness  FLOAT NOT NULL,
  overall_score       FLOAT NOT NULL,
  grade               ENUM('A','B','C','D','E') NOT NULL,
  target_zone         INT NOT NULL,
  cycle_time_ms       INT NOT NULL,
  image_path          VARCHAR(512) NULL,
  created_at          DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
  updated_at          DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  INDEX idx_batch_created (batch_id, created_at),
  INDEX idx_line_grade_date (line_id, grade, created_at),
  INDEX idx_grade_time (grade, created_at),
  CONSTRAINT fk_sorting_batch FOREIGN KEY (batch_id) REFERENCES sorting_batch(id) ON DELETE RESTRICT
);

CREATE TABLE IF NOT EXISTS quality_standard (
  id                    BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
  fruit_type             VARCHAR(32) NOT NULL,
  grade                  ENUM('A','B','C','D','E') NOT NULL,
  min_diameter_mm        FLOAT NOT NULL,
  max_diameter_mm        FLOAT NULL,
  min_color_coverage_pct FLOAT NOT NULL,
  max_defect_count       INT NOT NULL,
  min_shape_index        FLOAT NOT NULL,
  target_zone            INT NOT NULL,
  weight_config          JSON NOT NULL,
  is_active              TINYINT(1) NOT NULL DEFAULT 1,
  version                INT NOT NULL DEFAULT 1,
  created_at             DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
  updated_at             DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  UNIQUE KEY uk_fruit_grade_version (fruit_type, grade, version),
  INDEX idx_active (is_active)
);

CREATE TABLE IF NOT EXISTS equipment_status (
  id             BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
  device_id       VARCHAR(64) NOT NULL,
  device_type     ENUM('arm','agv','camera','conveyor') NOT NULL,
  line_id         VARCHAR(32) NULL,
  status          ENUM('online','offline','error','maintenance') NOT NULL,
  operation_mode  ENUM('auto','manual','idle') NULL,
  battery_percent INT NULL,
  position        JSON NULL,
  error_code      VARCHAR(32) NULL,
  last_heartbeat  DATETIME NULL,
  is_latest       TINYINT(1) NOT NULL DEFAULT 1,
  reported_at     DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
  INDEX idx_device_latest (device_id, is_latest),
  INDEX idx_device_time (device_id, reported_at),
  INDEX idx_status_latest (status, is_latest)
);

关键点解释(对应项目设计文档的统一口径):

4. 插入少量演示数据(用于自测)

项目路径:08_greensort_system/scripts/db/02_seed.sql

USE greensort_db;

INSERT INTO sorting_batch(batch_no, fruit_type, line_id, start_time, status)
VALUES ('BATCH-20260409-001', 'xibeilei_apple', 'LINE-01', NOW(), 1);

INSERT INTO quality_standard(
  fruit_type, grade,
  min_diameter_mm, max_diameter_mm, min_color_coverage_pct, max_defect_count, min_shape_index,
  target_zone, weight_config, is_active, version
)
VALUES
('xibeilei_apple', 'A', 145, NULL, 90, 0, 90, 1, '{"着色":40, "果径":25, "缺陷":25, "果形":10}', 1, 1),
('xibeilei_apple', 'B', 135, 145, 80, 1, 85, 2, '{"着色":40, "果径":25, "缺陷":25, "果形":10}', 1, 1);

INSERT INTO sorting_record(
  batch_id, fruit_id, line_id,
  diameter_mm, color_coverage_pct, defect_count, defect_types, shape_index, surface_smoothness, overall_score,
  grade, target_zone, cycle_time_ms, image_path
)
VALUES
(1, 'APPLE-20260409-000001', 'LINE-01', 145.2, 92.0, 0, JSON_ARRAY(), 92.0, 88.0, 90.2, 'A', 1, 8200, '/uploads/2026/04/09/apple_000001.jpg'),
(1, 'APPLE-20260409-000002', 'LINE-01', 138.5, 85.0, 1, JSON_ARRAY('bruise'), 86.0, 80.0, 82.5, 'B', 2, 9100, '/uploads/2026/04/09/apple_000002.jpg');

INSERT INTO equipment_status(device_id, device_type, line_id, status, operation_mode, battery_percent, is_latest, last_heartbeat)
VALUES
('camera_001', 'camera', 'LINE-01', 'online', 'auto', NULL, 1, NOW()),
('agv_001', 'agv', 'LINE-01', 'online', 'auto', 78, 1, NOW());

关键点解释:

1. 条件查询:按批次与等级筛选(带排序与分页)

SELECT
  id, batch_id, fruit_id, line_id, diameter_mm, overall_score, grade, target_zone, created_at
FROM sorting_record
WHERE batch_id = 1
  AND grade IN ('A', 'B')
ORDER BY created_at DESC
LIMIT 20;

关键点解释:

2. 聚合查询:按产线统计今日品级分布与平均分拣周期

SELECT
  grade,
  COUNT(*)           AS cnt,
  AVG(cycle_time_ms) AS avg_cycle_time_ms
FROM sorting_record
WHERE line_id = 'LINE-01'
  AND created_at >= CURDATE()
  AND created_at < DATE_ADD(CURDATE(), INTERVAL 1 DAY)
GROUP BY grade
ORDER BY grade;

关键点解释:

3. 联表查询(进阶预告):把“分拣记录 → 批次 → 品质标准”串成可追溯证据

SELECT
  sr.created_at,
  sb.batch_no,
  sr.fruit_id,
  sr.line_id,
  sr.grade,
  sr.overall_score,
  qs.min_diameter_mm,
  qs.min_color_coverage_pct,
  qs.max_defect_count,
  qs.target_zone AS standard_target_zone
FROM sorting_record sr
JOIN sorting_batch sb ON sb.id = sr.batch_id
JOIN quality_standard qs
  ON qs.fruit_type = sb.fruit_type
 AND qs.grade = sr.grade
 AND qs.is_active = 1
WHERE sr.batch_id = 1
ORDER BY sr.created_at DESC
LIMIT 20;

关键点解释:


  1. 目标提醒:今天不是写“任意 SQL 执行器”,而是写“安全可控的查询工具”。

二、项目视角:把代码串起来(文件级连线)

工具函数访问数据库:两种方式怎么选(学生必读)

对比要点:

本的选择:

包导入运行示例(建议直接复制):

cd 08_greensort_system/backend

# 启动 FastAPI(项目正式接口路线)
python -m uvicorn app.main:app --host 127.0.0.1 --port 8000

1. 安装依赖(示例)

项目路径:08_greensort_system/backend/requirements.txt(依赖清单)

pip install mysql-connector-python==8.4.0 sqlalchemy==2.0.30 aiomysql==0.2.0 httpx==0.27.0 python-dotenv==1.2.2

关键点解释:

2. 配置 .env(不要提交仓库)

项目路径:08_greensort_system/backend/.env.example(复制为 .env 使用)

MYSQL_HOST=127.0.0.1
MYSQL_PORT=3306
MYSQL_USER=greensort_ro
MYSQL_PASSWORD=Replace_With_Strong_Password
MYSQL_DATABASE=greensort_db
MYSQL_READONLY_USER=greensort_ro
MYSQL_READONLY_PASSWORD=Replace_With_Strong_Password
MYSQL_POOL_NAME=sorting_pool
MYSQL_POOL_SIZE=5
MYSQL_CONN_TIMEOUT=5

关键点解释:

3.(可选对照)直连数据库原型:MySQLClient(mysql-connector-python)

项目路径:08_greensort_system/backend/common/mysql_client.py

定位说明(先讲清边界):

import os
import re
import uuid
from dataclasses import dataclass
from typing import Any

from dotenv import load_dotenv
from mysql.connector.pooling import MySQLConnectionPool

_DISALLOWED_SQL = re.compile(
    r"(?is)\b(insert|update|delete|drop|alter|truncate|create|grant|revoke)\b|;|--|/\*|\*/"
)

@dataclass(frozen=True)
class QueryResult:
    trace_id: str
    rows: list[dict[str, Any]]

class MySQLClient:
    def __init__(self) -> None:
        load_dotenv()
        self._pool = self._build_pool()

    def _build_pool(self) -> MySQLConnectionPool:
        host = os.getenv("MYSQL_HOST", "127.0.0.1")
        port = int(os.getenv("MYSQL_PORT", "3306"))
        user = os.getenv("MYSQL_USER", "")
        password = os.getenv("MYSQL_PASSWORD", "")
        database = os.getenv("MYSQL_DATABASE", "")
        pool_name = os.getenv("MYSQL_POOL_NAME", "sorting_pool")
        pool_size = int(os.getenv("MYSQL_POOL_SIZE", "5"))
        conn_timeout = int(os.getenv("MYSQL_CONN_TIMEOUT", "5"))

        if not user or not password or not database:
            raise RuntimeError("MySQL 配置缺失:请检查 .env 中 MYSQL_USER/MYSQL_PASSWORD/MYSQL_DATABASE")

        return MySQLConnectionPool(
            pool_name=pool_name,
            pool_size=pool_size,
            host=host,
            port=port,
            user=user,
            password=password,
            database=database,
            connection_timeout=conn_timeout,
        )

    def select(self, sql: str, params: tuple[Any, ...] | None = None) -> QueryResult:
        trace_id = uuid.uuid4().hex[:8]
        normalized = (sql or "").strip()
        if not normalized:
            raise ValueError(f"EMPTY_SQL trace_id={trace_id}")

        if not normalized.lower().startswith("select"):
            raise ValueError(f"ONLY_SELECT_ALLOWED trace_id={trace_id}")

        if _DISALLOWED_SQL.search(normalized):
            raise ValueError(f"SQL_NOT_ALLOWED trace_id={trace_id}")

        cnx = self._pool.get_connection()
        try:
            cur = cnx.cursor(dictionary=True)
            cur.execute(normalized, params or ())
            rows = list(cur.fetchall())
            cur.close()
            return QueryResult(trace_id=trace_id, rows=rows)
        finally:
            cnx.close()

def _self_test() -> None:
    db = MySQLClient()

    r1 = db.select(
        """
        SELECT id, batch_id, fruit_id, line_id, diameter_mm, overall_score, grade, target_zone, created_at
        FROM sorting_record
        WHERE batch_id = %s AND grade IN (%s, %s)
        ORDER BY created_at DESC
        LIMIT %s
        """,
        (1, "A", "B", 20),
    )
    print("trace_id:", r1.trace_id, "rows:", len(r1.rows))

    r2 = db.select(
        """
        SELECT grade, COUNT(*) AS cnt, AVG(cycle_time_ms) AS avg_cycle_time_ms
        FROM sorting_record
        WHERE line_id = %s AND created_at >= %s AND created_at < %s
        GROUP BY grade
        ORDER BY grade
        """,
        ("LINE-01", "2026-04-09 00:00:00", "2026-04-10 00:00:00"),
    )
    print("trace_id:", r2.trace_id, "rows:", r2.rows)

if __name__ == "__main__":
    _self_test()

关键点解释(自检用,建议逐条对照):

补充:项目一致实现(SQLAlchemy + aiomysql,了解即可)

项目路径:08_greensort_system/backend/app/core/database.py

import os

from dotenv import load_dotenv
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker

load_dotenv()

def _get_env_int(key: str, default: int) -> int:
    v = os.getenv(key)
    if v is None or v == "":
        return default
    return int(v)

def _build_mysql_url_from_env(*, user_key: str, password_key: str) -> str:
    host = os.getenv("MYSQL_HOST", "127.0.0.1")
    port = os.getenv("MYSQL_PORT", "3306")
    database = os.getenv("MYSQL_DATABASE", "greensort_db")
    user = os.getenv(user_key, "greensort")
    password = os.getenv(password_key, "greensort123")
    return f"mysql+aiomysql://{user}:{password}@{host}:{port}/{database}?charset=utf8mb4"

DATABASE_URL = os.getenv("DATABASE_URL") or _build_mysql_url_from_env(
    user_key="MYSQL_USER",
    password_key="MYSQL_PASSWORD",
)
DATABASE_READONLY_URL = os.getenv("DATABASE_READONLY_URL") or _build_mysql_url_from_env(
    user_key="MYSQL_READONLY_USER",
    password_key="MYSQL_READONLY_PASSWORD",
)

POOL_SIZE = _get_env_int("SQLALCHEMY_POOL_SIZE", 20)
MAX_OVERFLOW = _get_env_int("SQLALCHEMY_MAX_OVERFLOW", 40)
POOL_RECYCLE = _get_env_int("SQLALCHEMY_POOL_RECYCLE", 3600)
POOL_PRE_PING = os.getenv("SQLALCHEMY_POOL_PRE_PING", "true").lower() in {"1", "true", "yes"}

READONLY_POOL_SIZE = _get_env_int("SQLALCHEMY_READONLY_POOL_SIZE", 10)
READONLY_MAX_OVERFLOW = _get_env_int("SQLALCHEMY_READONLY_MAX_OVERFLOW", 20)

engine = create_async_engine(
    DATABASE_URL,
    echo=False,
    pool_size=POOL_SIZE,
    max_overflow=MAX_OVERFLOW,
    pool_recycle=POOL_RECYCLE,
    pool_pre_ping=POOL_PRE_PING,
)

engine_readonly = create_async_engine(
    DATABASE_READONLY_URL,
    echo=False,
    pool_size=READONLY_POOL_SIZE,
    max_overflow=READONLY_MAX_OVERFLOW,
    pool_recycle=POOL_RECYCLE,
    pool_pre_ping=POOL_PRE_PING,
)

AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

关键点解释(与设计文档字段一一对应):

4. 项目版实现:FastAPI + SQLAlchemy(Repository/Service/Router,推荐结构)

项目设计文档采用 FastAPI + Pydantic 的后端服务层。真实项目中,查询通常不直接暴露“任意 SQL”,而是走:路由(API)→ Service(业务)→ Repository(数据访问)→ 数据库(AsyncSession/连接池)。下面给出一套与项目文档一致、且能把“分页/筛选/安全”讲清楚的最小结构。

项目路径:08_greensort_system/backend/app/core/database_deps.py

from collections.abc import AsyncGenerator

from sqlalchemy.ext.asyncio import AsyncSession

from app.core.database import AsyncSessionLocal

async def get_db() -> AsyncGenerator[AsyncSession, None]:
    async with AsyncSessionLocal() as session:
        yield session

关键点解释:

项目路径:08_greensort_system/backend/schemas/sorting.py

from datetime import datetime
from typing import Any

from pydantic import BaseModel

class SortingRecordOut(BaseModel):
    id: int
    batch_id: int
    fruit_id: str
    line_id: str
    diameter_mm: float
    grade: str
    overall_score: float
    created_at: datetime
    image_path: str | None = None

class PageResponse(BaseModel):
    total: int
    page: int
    page_size: int
    items: list[SortingRecordOut]
    trace_id: str
    extra: dict[str, Any] | None = None

关键点解释:

项目路径:08_greensort_system/backend/repositories/sorting_repo.py

import uuid
from datetime import datetime, timedelta

from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession

class SortingRepository:
    def __init__(self, session: AsyncSession) -> None:
        self._session = session

    async def list_records(
        self,
        *,
        page: int,
        page_size: int,
        line_id: str | None,
        grade: str | None,
        batch_id: int | None,
        start_date: str | None,
        end_date: str | None,
    ) -> tuple[int, list[dict], str]:
        trace_id = uuid.uuid4().hex[:8]
        if page < 1:
            raise ValueError("page must be >= 1")
        if page_size < 1 or page_size > 100:
            raise ValueError("page_size must be 1..100")

        where_parts: list[str] = []
        params: dict[str, object] = {}

        if line_id:
            where_parts.append("line_id = :line_id")
            params["line_id"] = line_id
        if grade:
            where_parts.append("grade = :grade")
            params["grade"] = grade
        if batch_id is not None:
            where_parts.append("batch_id = :batch_id")
            params["batch_id"] = batch_id

        if start_date:
            start_dt = datetime.strptime(start_date, "%Y-%m-%d")
            where_parts.append("created_at >= :start_dt")
            params["start_dt"] = start_dt
        if end_date:
            end_dt_exclusive = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1)
            where_parts.append("created_at < :end_dt_exclusive")
            params["end_dt_exclusive"] = end_dt_exclusive

        where_sql = (" WHERE " + " AND ".join(where_parts)) if where_parts else ""

        total_sql = text(f"SELECT COUNT(*) AS total FROM sorting_record{where_sql}")
        total = int((await self._session.execute(total_sql, params)).mappings().one()["total"])

        offset = (page - 1) * page_size
        data_sql = text(
            f"""
            SELECT id, batch_id, fruit_id, line_id, diameter_mm, grade, overall_score, created_at, image_path
            FROM sorting_record
            {where_sql}
            ORDER BY created_at DESC
            LIMIT :limit OFFSET :offset
            """
        )
        rows = (
            (await self._session.execute(data_sql, {**params, "limit": page_size, "offset": offset}))
            .mappings()
            .all()
        )
        return total, [dict(r) for r in rows], trace_id

关键点解释(与安全口径一致):

项目路径:08_greensort_system/backend/services/sorting_service.py

from sqlalchemy.ext.asyncio import AsyncSession

from repositories.sorting_repo import SortingRepository

class SortingService:
    def __init__(self, session: AsyncSession) -> None:
        self._repo = SortingRepository(session)

    async def list_records(self, **kwargs):
        return await self._repo.list_records(**kwargs)

关键点解释:

项目路径:08_greensort_system/backend/app/api/v1/sorting.py

from fastapi import APIRouter, Depends, Query
from sqlalchemy.ext.asyncio import AsyncSession

from app.core.database_deps import get_db
from schemas.sorting import PageResponse, SortingRecordOut
from services.sorting_service import SortingService

router = APIRouter(prefix="/api/v1/sorting", tags=["sorting"])

@router.get("/records", response_model=PageResponse)
async def list_sorting_records(
    page: int = Query(1, ge=1),
    page_size: int = Query(20, ge=1, le=100),
    line_id: str | None = None,
    grade: str | None = None,
    start_date: str | None = None,
    end_date: str | None = None,
    batch_id: int | None = None,
    db: AsyncSession = Depends(get_db),
):
    service = SortingService(db)
    total, rows, trace_id = await service.list_records(
        page=page,
        page_size=page_size,
        line_id=line_id,
        grade=grade,
        batch_id=batch_id,
        start_date=start_date,
        end_date=end_date,
    )
    return PageResponse(
        total=total,
        page=page,
        page_size=page_size,
        items=[SortingRecordOut(**r) for r in rows],
        trace_id=trace_id,
    )

关键点解释:

5. 将接口封装成“分拣场景工具函数”(Tool 调用 FastAPI,推荐)

项目路径:08_greensort_system/backend/tools/query_sorting_records.py

import json
import uuid

import httpx

def query_sorting_records(
    page: int = 1,
    page_size: int = 20,
    line_id: str | None = None,
    grade: str | None = None,
    start_date: str | None = None,
    end_date: str | None = None,
    batch_id: int | None = None,
    api_base_url: str = "http://127.0.0.1:8000",
    token: str | None = None,
) -> str:
    trace_id = uuid.uuid4().hex[:8]
    if not api_base_url:
        return json.dumps({"ok": False, "error": "api_base_url is empty", "trace_id": trace_id}, ensure_ascii=False)

    api_base_url = api_base_url.rstrip("/")
    url = f"{api_base_url}/api/v1/sorting/records"

    headers: dict[str, str] = {}
    if token:
        headers["Authorization"] = f"Bearer {token}"

    params: dict[str, object] = {"page": page, "page_size": page_size}
    if line_id is not None:
        params["line_id"] = line_id
    if grade is not None:
        params["grade"] = grade
    if batch_id is not None:
        params["batch_id"] = batch_id
    if start_date is not None:
        params["start_date"] = start_date
    if end_date is not None:
        params["end_date"] = end_date

    try:
        with httpx.Client(timeout=10.0) as client:
            resp = client.get(url, params=params, headers=headers)
        return json.dumps(
            {
                "ok": resp.is_success,
                "trace_id": trace_id,
                "status_code": resp.status_code,
                "data": resp.json() if resp.headers.get("content-type", "").startswith("application/json") else resp.text,
            },
            ensure_ascii=False,
        )
    except Exception as e:
        return json.dumps({"ok": False, "trace_id": trace_id, "error": str(e)}, ensure_ascii=False)

运行建议(包导入方式):

cd 08_greensort_system/backend

# 先启动服务(另开一个终端窗口运行)
python -m uvicorn app.main:app --host 127.0.0.1 --port 8000

# 打开新的终端
python -c "from tools.query_sorting_records import query_sorting_records; print(query_sorting_records(page=1, page_size=10, line_id='LINE-01', grade='A', batch_id=1, start_date=None, end_date=None))"

关键点解释:

扩展:Tool 直连数据库(MySQLClient 方式,不推荐生产环境)

这个扩展用于理解“当 Tool 必须在受控环境里直连 DB 时,如何把直连查询封装成安全可控的工具函数”。真实项目更推荐上一节的“Tool 调用 FastAPI”方式。

项目路径:08_greensort_system/backend/common/mysql_client.py(复用 MySQLClient)

import json
import uuid
from datetime import date, datetime, timedelta
from decimal import Decimal
from typing import Any

from common.mysql_client import MySQLClient

_ALLOWED_GRADES = {"A", "B", "C", "D", "E"}

def _to_jsonable(value: Any) -> Any:
    if value is None:
        return None
    if isinstance(value, (str, int, float, bool)):
        return value
    if isinstance(value, (datetime, date)):
        return value.isoformat(sep=" ", timespec="seconds") if isinstance(value, datetime) else value.isoformat()
    if isinstance(value, Decimal):
        return float(value)
    if isinstance(value, bytes):
        try:
            return value.decode("utf-8")
        except UnicodeDecodeError:
            return value.hex()
    if isinstance(value, list):
        return [_to_jsonable(v) for v in value]
    if isinstance(value, dict):
        return {str(k): _to_jsonable(v) for k, v in value.items()}
    return str(value)

def _rows_to_jsonable(rows: list[dict[str, Any]]) -> list[dict[str, Any]]:
    return [{k: _to_jsonable(v) for k, v in row.items()} for row in rows]

def query_sorting_records_db(
    *,
    line_id: str | None,
    grade: str | None,
    batch_id: int | None,
    start_date: str | None,
    end_date: str | None,
    page: int = 1,
    page_size: int = 20,
) -> str:
    trace_id = uuid.uuid4().hex[:8]

    if page < 1:
        return json.dumps({"ok": False, "trace_id": trace_id, "error": "page must be >= 1"}, ensure_ascii=False)
    if page_size < 1 or page_size > 100:
        return json.dumps({"ok": False, "trace_id": trace_id, "error": "page_size must be 1..100"}, ensure_ascii=False)

    if line_id is not None:
        line_id = line_id.strip()
        if not line_id:
            return json.dumps({"ok": False, "trace_id": trace_id, "error": "line_id is empty"}, ensure_ascii=False)

    if grade is not None:
        grade = grade.strip().upper()
        if grade not in _ALLOWED_GRADES:
            return json.dumps({"ok": False, "trace_id": trace_id, "error": "grade must be A/B/C/D/E"}, ensure_ascii=False)

    def _parse_ymd(s: str) -> datetime:
        return datetime.strptime(s, "%Y-%m-%d")

    where_parts: list[str] = []
    params_list: list[Any] = []

    if line_id is not None:
        where_parts.append("line_id = %s")
        params_list.append(line_id)
    if grade is not None:
        where_parts.append("grade = %s")
        params_list.append(grade)
    if batch_id is not None:
        where_parts.append("batch_id = %s")
        params_list.append(batch_id)

    if start_date is not None:
        try:
            start_dt = _parse_ymd(start_date)
        except ValueError:
            return json.dumps({"ok": False, "trace_id": trace_id, "error": "start_date must be YYYY-MM-DD"}, ensure_ascii=False)
        where_parts.append("created_at >= %s")
        params_list.append(start_dt)
    if end_date is not None:
        try:
            end_dt_exclusive = _parse_ymd(end_date) + timedelta(days=1)
        except ValueError:
            return json.dumps({"ok": False, "trace_id": trace_id, "error": "end_date must be YYYY-MM-DD"}, ensure_ascii=False)
        where_parts.append("created_at < %s")
        params_list.append(end_dt_exclusive)

    where_sql = (" WHERE " + " AND ".join(where_parts)) if where_parts else ""

    db = MySQLClient()

    count_sql = f"SELECT COUNT(*) AS total FROM sorting_record{where_sql}"
    count_res = db.select(count_sql, tuple(params_list))
    total = int(count_res.rows[0]["total"]) if count_res.rows else 0

    offset = (page - 1) * page_size
    data_sql = f"""
    SELECT id, batch_id, fruit_id, line_id, diameter_mm, grade, overall_score, created_at, image_path
    FROM sorting_record
    {where_sql}
    ORDER BY created_at DESC
    LIMIT %s OFFSET %s
    """
    data_params = tuple(params_list + [page_size, offset])
    data_res = db.select(data_sql, data_params)

    return json.dumps(
        {
            "ok": True,
            "trace_id": data_res.trace_id,
            "page": page,
            "page_size": page_size,
            "total": total,
            "records": _rows_to_jsonable(data_res.rows),
        },
        ensure_ascii=False,
    )

关键点解释(直连 DB 时必须做到):

运行建议(包导入方式):

cd 08_greensort_system/backend

python -c "from tools.query_sorting_records_db import query_sorting_records_db; print(query_sorting_records_db(line_id='LINE-01', grade='A', batch_id=1, start_date=None, end_date=None, page=1, page_size=10))"

1. 注入风险的典型坏例子(不要这么写)

项目路径:08_greensort_system/backend/examples/bad_sql_concat.py(反例)

sql = "SELECT * FROM sorting_record WHERE fruit_id = '" + fruit_id + "'"

为什么危险:

报错排查演练(学生必做 1 次):


  1. 配置 MySQL 数据库连接(含连接池、权限设置):能用只读账号成功连接并跑通自测。
  2. 分析分拣场景业务数据结构:能说明 sorting_record 的关键字段含义(至少 batch_id/fruit_id/line_id/diameter_mm/grade/overall_score/created_at),并能指出 quality_standardequipment_status 各自解决什么问题。
  3. 编写 2–3 个基础 SQL 查询语句:至少 1 条条件查询(对齐 API 的筛选字段:line_id/grade/batch_id/日期范围)+ 1 条聚合查询(品级分布/平均周期等)(可选 1 条联表追溯:记录→批次→标准)。
  4. 开发数据库查询工具:实现连接封装与查询执行,结果以 list[dict] 或 JSON 返回。
  5. 添加 SQL 注入防护逻辑:参数化 + 白名单 + 行数限制(至少满足两项)。
  6. 使用 AI 生成连接代码并优化自研查询语句:保留“原版→修正版→理由→自测证据”。
  7. 记录安全校验要点与问题:至少 3 条(写在提交说明里)。

课程思政(数据安全与工程伦理)


作业:不布置

1)提交数据库连接配置截图(含连接成功日志)及连接代码(带注释)。

2)提交分拣场景 SQL 查询语句及查询结果截图,附语句优化说明(含 AI 优化建议)。

3)提交数据库查询工具代码(含安全校验逻辑),附开发思路说明。